from kafka import KafkaProducer
from kafka.errors import KafkaError

# 设置 Kafka 服务器地址（替换为 B 服务器的 IP 地址）
producer = KafkaProducer(
    bootstrap_servers=['10.48.0.4:9092'],
    request_timeout_ms=60000  # 增加超时时间
)

# 测试消息内容
test_message = "这是一个测试消息"

# 定义回调函数，成功或失败时调用
def on_send_success(record_metadata):
    print("消息成功发送到 Kafka 服务器")
    print("主题:", record_metadata.topic)
    print("分区:", record_metadata.partition)
    print("偏移量:", record_metadata.offset)

def on_send_error(excp):
    print("消息发送失败:", excp)

# 发送测试消息
producer.send('test_topic', value=test_message.encode('utf-8')).add_callback(on_send_success).add_errback(on_send_error)

# 确保所有消息发送完成
producer.flush()



#我的kafka服务器部署在A服务器，并且在A服务器（10.50.0.4）上运行KafkaProducer脚本能正常send；同样的方式在B服务器（10.254.120.201）无法send，并报错，如下：
#消息发送失败: KafkaTimeoutError: Batch for TopicPartition(topic='test_topic', partition=0) containing 1 record(s) expired: 35 seconds have passed since batch creation plus linger time
